Assignment 03

Author
Affiliation

Thomas Primero

Boston University

Published

September 23, 2025

Modified

September 23, 2025

1 Load the Dataset

Load the Raw Dataset: Use Pyspark to the lightcast_data.csv file into a DataFrame: You can reuse the previous code. Copying code from your friend constitutes plagiarism. DO NOT DO THIS.

import pandas as pd
import plotly.express as px
import plotly.io as pio
from pyspark.sql import SparkSession
import re
import numpy as np
import plotly.graph_objects as go
from pyspark.sql.functions import col, split, explode, regexp_replace, transform, when
from pyspark.sql import functions as F
from pyspark.sql.functions import col, monotonically_increasing_id


np.random.seed(42)

pio.renderers.default = "notebook"

# Initialize Spark Session
spark = SparkSession.builder.appName("LightcastData").getOrCreate()

# Load Data
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("/home/ubuntu/assignment-03-t-primero/data/lightcast_job_postings.csv")
df.createOrReplaceTempView("job_postings")

# Show Schema and Sample Data
# print("---This is Diagnostic check, No need to print it in the final doc---")

# df.printSchema() # comment this line when rendering the submission
# df.show(5)
WARNING: Using incubator modules: jdk.incubator.vector
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/09/23 22:42:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/09/23 22:42:26 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
[Stage 1:>                                                          (0 + 1) / 1]                                                                                25/09/23 22:42:40 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

2 Data Preparation

We will be converting numerical columns to floats - this is so we can perform functions on it such as average.

#  Step 1: Casting salary and experience columns
df = df.withColumn("SALARY", col("SALARY").cast("float")) \
     .withColumn("SALARY_FROM", col("SALARY_FROM").cast("float")) \
     .withColumn("SALARY_TO", col("SALARY_TO").cast("float")) \
     .withColumn("MIN_YEARS_EXPERIENCE", col("MIN_YEARS_EXPERIENCE").cast("float")) \
     .withColumn("MAX_YEARS_EXPERIENCE", col("MAX_YEARS_EXPERIENCE").cast("float"))

# Step 2: Computing medians for salary columns
def compute_median(sdf, col_name):
    q = sdf.approxQuantile(col_name, [0.5], 0.01)
    return q[0] if q else None

median_from = compute_median(df, "SALARY_FROM")
median_to = compute_median(df, "SALARY_TO")
median_salary = compute_median(df, "SALARY")

print("Medians:", median_from, median_to, median_salary)

# Step 3: Imputing missing salaries, but not experience
df = df.fillna({
    "SALARY_FROM": median_from,
    "SALARY_TO": median_to
})

# Step 5: Computing average salary
df = df.withColumn("Average_Salary", (col("SALARY_FROM") + col("SALARY_TO")) / 2)

# Step 6: Selecting required columns
export_cols = [
    "EDUCATION_LEVELS_NAME",
    "REMOTE_TYPE_NAME",
    "MAX_YEARS_EXPERIENCE",
    "Average_Salary",
    "LOT_V6_SPECIALIZED_OCCUPATION_NAME"
]
df_selected = df.select(*export_cols)

# Step 7: Saving to CSV
pdf = df_selected.toPandas()
pdf.to_csv("./data/lightcast_cleaned.csv", index=False)      

print("Data cleaning complete. Rows retained:", len(pdf))
[Stage 2:>                                                          (0 + 1) / 1]                                                                                [Stage 3:>                                                          (0 + 1) / 1]                                                                                [Stage 4:>                                                          (0 + 1) / 1]                                                                                
Medians: 87295.0 130042.0 115024.0
[Stage 5:>                                                          (0 + 1) / 1]                                                                                
Data cleaning complete. Rows retained: 72498
# Histogram of SALARY distribution
# salary_df = df.filter(col("SALARY").isNotNull() & (col("SALARY") > 0))
#fig = px.histogram(salary_df.toPandas(), x="SALARY", nbins=50, title="Salary Distribution")
#fig.update_layout(bargap=0.1)

3 Salary Distribution by Industry and Employment Type

Compare salary variations across industries. Filter the dataset Remove records where salary is missing or zero. Aggregate Data Group by NAICS industry codes. Group by employment type and compute salary distribution. Visualize results Create a box plot where: X-axis = ‘NAICS2_NAME’ Y-axis = ‘SALARY_FROM’, or ‘SALARY_TO’, or ‘SALARY’ Group by ‘EMPLOYMENT_TYPE_NAME’. Customize colors, fonts, and styles. Explanation: Write two sentences about what the graph reveals.

# Your Code for 1st question here
import pandas as pd
import polars as pl

# Filter out missing or zero salary values
pdf = df.filter(df["SALARY"] > 0).select("EMPLOYMENT_TYPE_NAME", "SALARY").toPandas()
# pdf.head()

# Clean employment type names for better readability
# This Basically looks for symbols numbers (which were incorrectly added into data name)
pdf["EMPLOYMENT_TYPE_NAME"] = pdf["EMPLOYMENT_TYPE_NAME"].apply(lambda x: re.sub(r"[^\x00-\x7f]+", "", x))
# pdf.head()

# Compute median salary for sorting
median_salaries = pdf.groupby("EMPLOYMENT_TYPE_NAME")["SALARY"].median()
# median_salaries.head()

# Sort employment types based on median salary in descending order
sorted_employment_types = median_salaries.sort_values(ascending=False).index

# Apply sorted categories
pdf["EMPLOYMENT_TYPE_NAME"] = pd.Categorical(
    pdf["EMPLOYMENT_TYPE_NAME"],
    categories=sorted_employment_types,
    ordered=True
)

# Create box plot with horizontal grid lines
fig = px.box(
    pdf,
    x="EMPLOYMENT_TYPE_NAME",
    y="SALARY",
    title="Salary Distribution by Employment Type",
    color_discrete_sequence=["orange"],  # Single neutral color
    boxmode="group",
    points="all",  # Show all outliers
)
# Improve layout, font styles, and axis labels
fig.update_layout(
    title=dict(
        text="Salary Distribution by Employment Type",
        font=dict(size=26, family="Verdana", color="black", weight="bold")  # Bigger & Bold Title
    ),
    xaxis=dict(
        title=dict(text="Employment Type", font=dict(size=22, family="Verdana", color="black", weight="bold")),  # Bigger X-label
        tickangle=0,  # Rotate X-axis labels for readability
        tickfont=dict(size=18, family="Verdana", color="black", weight="bold"),  # Bigger & Bold X-ticks
        showline=True,  # Show axis lines
        linewidth=2,  # Thicker axis lines
        linecolor="black",
        mirror=True,
        showgrid=False,  # Remove vertical grid lines
        categoryorder="array",
        categoryarray=sorted_employment_types.tolist()
    ),
    yaxis=dict(
        title=dict(text="Salary (Thousands)", font=dict(size=22, family="Verdana", color="black", weight="bold")),  # Bigger Y-label
        tickvals=[0, 50000, 100000, 150000, 200000, 250000, 300000, 350000, 400000, 450000, 500000],
        ticktext=["0", "50", "100", "150", "200", "250", "300", "350", "400", "450", "500"],
        tickfont=dict(size=18, family="Verdana", color="black", weight="bold"),  # Bigger & Bold Y-ticks
        showline=True,
        linewidth=2,
        linecolor="black",
        mirror=True,
        showgrid=True,  # Enable light horizontal grid lines
        gridcolor="lightgray",  # Light shade for the horizontal grid
        gridwidth=0.5  # Thin grid lines
    ),
    font=dict(family="Verdana", size=16, color="black"),
    boxgap=0.5,
    plot_bgcolor="white",
    paper_bgcolor="white",
    showlegend=False,
    height=800,
    width=900
)

# Show the figure
fig.show()
[Stage 6:>                                                          (0 + 1) / 1]                                                                                

4 Salary Distribution by Industry

pdf = df.select("NAICS2_NAME", "SALARY").toPandas()
fig = px.box(pdf, x="NAICS2_NAME", y="SALARY", title="Salary Distribution by Industry", color_discrete_sequence=["#EF553B"])
fig.update_layout(font_family="Arial", title_font_size=16,
                  height=1000,
                  width=1200)
# Rotate x-axis labels for readability
fig.update_xaxes(tickangle=45, tickfont=dict(size=12))
fig.show()
[Stage 7:>                                                          (0 + 1) / 1]                                                                                

5 Salary Analysis by ONET Occupation Type (Bubble Chart)

Analyze how salaries differ across ONET occupation types. Aggregate Data Compute median salary for each occupation in the ONET taxonomy. Visualize results Create a bubble chart where: X-axis = ONET_NAME Y-axis = Median Salary Size = Number of job postings Apply custom colors and font styles. Explanation: Write two sentences about what the graph reveals.

6 ```{python}

7 Step 3: Bubble chart using Plotly

import plotly.express as px

fig = px.scatter( salary_pd, x=“ONET_NAME”, y=“Median_Salary”, size=“Job_Postings”, title=“Salary Analysis by ONET Occupation Type (Bubble Chart)”, labels={ “ONET_NAME”: “ONET Occupation”, “Median_Salary”: “Median Salary”, “Job_Postings”: “Number of Job Postings” }, hover_name=“ONET_NAME”, size_max=60,

```